---
id: event-bus
title: 22. 事件总线
sidebar_label: 22. 事件总线（New）
---

import useBaseUrl from "@docusaurus/useBaseUrl";

:::warning v2.20 以下版本说明

**在 `Furion v2.20+` 版本后采用 [Jaina](https://gitee.com/dotnetchina/Jaina) 事件总线替换原有的 `EventBus`**，[查看旧文档](./event-bus-old)

:::

## 25.1 快速入门

1. 定义事件订阅者 `ToDoEventSubscriber`：

```cs {2,10-11,19-21}
// 实现 IEventSubscriber 接口
public class ToDoEventSubscriber : IEventSubscriber
{
    private readonly ILogger<ToDoEventSubscriber> _logger;
    public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)
    {
        _logger = logger;
    }

    [EventSubscribe("ToDo:Create")]
    public async Task CreateToDo(EventHandlerExecutingContext context)
    {
        var todo = context.Source;
        _logger.LogInformation("创建一个 ToDo：{Name}", todo.Payload);
        await Task.CompletedTask;
    }

    // 支持多个
    [EventSubscribe("ToDo:Create")]
    [EventSubscribe("ToDo:Update")]
    public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)
    {
        var todo = context.Source;
        _logger.LogInformation("创建或更新一个 ToDo：{Name}", todo.Payload);
        await Task.CompletedTask;
    }
}
```

2. 创建控制器 `ToDoController`，依赖注入 `IEventPublisher` 服务：

```cs {4,13}
public class ToDoController : ControllerBase
{
    // 依赖注入事件发布者 IEventPublisher
    private readonly IEventPublisher _eventPublisher;
    public ToDoController(IEventPublisher eventPublisher)
    {
        _eventPublisher = eventPublisher;
    }

    // 发布 ToDo:Create 消息
    public async Task CreateDoTo(string name)
    {
        await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));
    }
}
```

3. 在 `Startup.cs` 注册 `EventBus` 服务：

```cs {2,5}
// 注册 EventBus 服务
services.AddEventBus(buidler =>
{
    // 注册 ToDo 事件订阅者
    buidler.AddSubscriber<ToDoEventSubscriber>();
});
```

4. 运行项目：

```bash
info: Jaina.Samples.ToDoEventSubscriber[0]
      创建一个 ToDo：Jaina
```

## 25.2 自定义事件源

`Furion` 使用 `IEventSource` 作为消息载体，任何实现该接口的类都可以充当消息载体。

如需自定义，只需实现 `IEventSource` 接口即可：

```cs {1,15,20,26,31}
public class ToDoEventSource : IEventSource
{
    public ToDoEventSource(string eventId, string todoName)
    {
        EventId = eventId;
        ToDoName = todoName;
    }

    // 自定义属性
    public string ToDoName { get; }

    /// <summary>
    /// 事件 Id
    /// </summary>
    public string EventId { get; }

    /// <summary>
    /// 事件承载（携带）数据
    /// </summary>
    public object Payload { get; }

    /// <summary>
    /// 取消任务 Token
    /// </summary>
    /// <remarks>用于取消本次消息处理</remarks>
    public CancellationToken CancellationToken { get; }

    /// <summary>
    /// 事件创建时间
    /// </summary>
    public DateTime CreatedTime { get; } = DateTime.UtcNow;
}
```

使用：

```cs
await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));
```

## 25.3 自定义事件源存储器

`Fruion` 默认采用 `Channel` 作为事件源 `IEventSource` 存储器，开发者可以使用任何消息队列组件进行替换，如 `Kafka、RabbitMQ、ActiveMQ` 等，也可以使用部分数据库 `Redis、SQL Server、MySql` 实现。

如需自定义，只需实现 `IEventSourceStorer` 接口即可：

```cs {1,11,17}
public class RedisEventSourceStorer : IEventSourceStorer
{
    private readonly IRedisClient _redisClient;

    public RedisEventSourceStorer(IRedisClient redisClient)
    {
        _redisClient = redisClient;
    }

    // 往 Redis 中写入一条
    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
    {
        await _redisClient.WriteAsync(...., cancellationToken);
    }

    // 从 Redis 中读取一条
    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
    {
       return await _redisClient.ReadAsync(...., cancellationToken);
    }
}
```

最后，在注册 `EventBus` 服务中替换默认 `IEventSourceStorer`：

```cs {4-8}
services.AddEventBus(buidler =>
{
    // 替换事件源存储器
    buidler.ReplaceStorer(serviceProvider =>
    {
        var redisClient = serviceProvider.GetService<IRedisClient>();
        return new RedisEventSourceStorer(redisClient);
    });
});
```

## 25.4 自定义事件发布者

`Furion` 默认内置基于 `Channel` 的事件发布者 `ChannelEventPublisher`。

如需自定义，只需实现 `IEventPublisher` 接口即可：

```cs {1,10}
public class ToDoEventPublisher : IEventPublisher
{
    private readonly IEventSourceStorer _eventSourceStorer;

    public ChannelEventPublisher(IEventSourceStorer eventSourceStorer)
    {
        _eventSourceStorer = eventSourceStorer;
    }

    public async Task PublishAsync(IEventSource eventSource)
    {
        await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);
    }
}
```

最后，在注册 `EventBus` 服务中替换默认 `IEventPublisher`：

```cs {4}
services.AddEventBus(buidler =>
{
    // 替换事件源存储器
    buidler.ReplacePublisher<ToDoEventPublisher>();
});
```

## 25.5 添加事件执行监视器

`Furion` 提供了 `IEventHandlerMonitor` 监视器接口，实现该接口可以监视所有订阅事件，包括 `执行之前、执行之后，执行异常，共享上下文数据`。

如添加 `ToDoEventHandlerMonitor`：

```cs {1,9,15}
public class ToDoEventHandlerMonitor : IEventHandlerMonitor
{
    private readonly ILogger<ToDoEventHandlerMonitor> _logger;
    public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)
    {
        _logger = logger;
    }

    public Task OnExecutingAsync(EventHandlerExecutingContext context)
    {
        _logger.LogInformation("执行之前：{EventId}", context.Source.EventId);
        return Task.CompletedTask;
    }

    public Task OnExecutedAsync(EventHandlerExecutedContext context)
    {
        _logger.LogInformation("执行之后：{EventId}", context.Source.EventId);

        if (context.Exception != null)
        {
            _logger.LogError(context.Exception, "执行出错啦：{EventId}", context.Source.EventId);
        }

        return Task.CompletedTask;
    }
}
```

最后，在注册 `EventBus` 服务中注册 `ToDoEventHandlerMonitor`：

```cs {4}
services.AddEventBus(buidler =>
{
    // 添加事件执行监视器
    buidler.AddMonitor<ToDoEventHandlerMonitor>();
});
```

## 25.6 添加事件执行器

`Furion` 提供了 `IEventHandlerExecutor` 执行器接口，可以让开发者自定义事件处理函数执行策略，如 `超时控制，失败重试、熔断等等`。

如添加 `RetryEventHandlerExecutor`：

```cs {1,3}
public class RetryEventHandlerExecutor : IEventHandlerExecutor
{
    public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)
    {
        // 如果执行失败，每隔 1s 重试，最多三次
        await Retry(async () => {
            await handler(context);
        }, 3, 1000);
    }
}
```

最后，在注册 `EventBus` 服务中注册 `RetryEventHandlerExecutor`：

```cs {4}
services.AddEventBus(buidler =>
{
    // 添加事件执行器
    buidler.AddExecutor<RetryEventHandlerExecutor>();
});
```

## 25.6 使用有作用域的服务

`Furion` 所有服务均注册为单例，如需使用作用域服务（单例服务可直接注入），可通过依赖注入 `IServiceProvider` 实例并通过 `CreateScope()` 创建一个作用域，如：

```cs {5,8,12,17-21}
public class ToDoEventSubscriber : IEventSubscriber
{
    private readonly ILogger<ToDoEventSubscriber> _logger;

    public ToDoEventSubscriber(IServiceProvider services
        , ILogger<ToDoEventSubscriber> logger)
    {
        Services = services;
        _logger = logger;
    }

    public IServiceProvider Services { get; }

    [EventSubscribe("ToDo:Create")]
    public async Task CreateToDo(EventHandlerExecutingContext context)
    {
        using (var scope = Services.CreateScope())
        {
            var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();
            // ....
        }
    }
}
```

## 22.7 订阅执行任务意外异常

```cs {4}
services.AddEventBus(buidler =>
{
    // 订阅 EventBus 未捕获异常
    buidler.UnobservedTaskExceptionHandler = (obj, args) =>
    {
        // ....
    };
});
```

## 22.8 `EventBusOptionsBuilder` 配置

`EventBusOptionsBuilder` 是 `AddEventBus` 构建服务选项，该选项包含以下属性和方法：

- 属性
  - `ChannelCapacity`：默认内存通道容量
  - `UnobservedTaskExceptionHandler`：订阅执行任务未察觉异常
- 方法
  - `AddSubscriber<TEventSubscriber>`：添加订阅者
  - `ReplacePublisher<TEventPublisher>`：替换发布者
  - `ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>)`：替换存储器
  - `AddMonitor<TEventHandlerMonitor>`：添加监视器
  - `AddExecutor<TEventHandlerExecutor>`：添加执行器


## 22.9 反馈与建议

:::note 与我们交流

给 Furion 提 [Issue](https://gitee.com/dotnetchina/Furion/issues/new?issue)。

:::
